---
layout: default
title: "Quarkus, Kafka, Camel Servlet, Wiring it Together"
date: 2019-07-06 05:00:00 +0200
published: 2019-07-04 05:00:00 +0200
comments: true
categories: development
tags: [quarkus, kubernetes, camel, kafka, smallrye]
github: "https://github.com/alainpham/quarkus-kafka-camel-servlet"
---

<p>
    When using Apache Camel with Quarkus as of today, we are limited number of Camel connectors. One important one being
    the Apache Kafka connector.
    The Kafka connector provided through the Smallrye Kafka Extension is available for Quarkus though. So in this
    article, I will show how to
    wire Smallrye Kafka connector and Camel together.
    We will also be using the camel-servlet component to reuse the undertow http endpoint provided by Quarkus and to
    spice things up a little we will use the XML DSL of Camel.
    All of this will be natively compiled in the end.
</p>
<!--more-->

<h2>Setup a project</h2>

<p>In a previous article I showed how to create a hello world using Camel and Quarkus. Here We will try to do something
    more advanced. Lets start by setting up the project</p>

{% highlight shell %}

mkdir quarkus-kafka-camel-servlet

cd quarkus-kafka-camel-servlet

mvn io.quarkus:quarkus-maven-plugin:0.18.0:create \
-DprojectGroupId=agilabs \
-DprojectArtifactId=quarkus-kafka-camel-servlet \
-DclassName="agilabs.GreetingResource" \
-Dpath="/hello"
{% endhighlight %}

<p>
    Add the following dependencies to the pom.xml file.
</p>
{% highlight xml %}
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-camel-core</artifactId>
</dependency>
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-camel-servlet</artifactId>
</dependency>
{% endhighlight %}

<h2>Configure Camel</h2>

<h3>Application properties</h3>

<p>Configure application properties in src/main/resources/application.properties</p>

<pre>
quarkus.camel.servlet.url-patterns = /camel/*
%dev.quarkus.camel.routes-uris = camel/routes.xml
quarkus.camel.routes-uris=file:/camel/routes.xml
quarkus.camel.defer-init-phase = true
</pre>

<p>
    We will match all calls that go to /camel/* to the camel servlet component. In dev mode we will scan the file
    routes.xml for camel routes located in the classpath, in the resource folder camel.
</p>
<p>
    When launched in "production mode" with the native executable the resources are not included by default in the
    binary. I'm externalizing the routes.xml file to the folder /camel so that it can be set
    and changed dynamically. This is actually quite interesting because we get to have a configurable native executable
    that we can fine tune by switching the routes during run time.
    For some when I tried to change the property quarkus.camel.routes-uris during native run time, it's new value doesn't
    seem to be taken into account. So this file path is kind of like being hard coded right now.. 
</p>
<p><strong>Edit July 6,2019 : I discovered that there are different <a
            href="https://quarkus.io/guides/extension-authors-guide" target="_blank">Configuration Phases</a> 
            : BUILD_TIME, BUILD_AND_RUN_TIME_FIXED & RUN_TIME. 
        It turns out this property is set to BUILD_AND_RUN_TIME_FIXED, so it is not dynamically read during native run time for the moment.
        More details <a target="_blank" href="https://github.com/quarkusio/quarkus/issues/3116">here</a>.
        </strong></p>
<p>
    Quarkus.camel.defer-init-phase property is set to true because it seems that the camel context is created before all
    the CDI bean wiring and dependency injection happens.
    I need to do further digging to understand entirely how this works.
</p>

<h3>
    Camel Routes XML Files
</h3>

<p>
    Lets set up the routes in the src/main/resources/camel/routes.xml file.
</p>

{% highlight xml %}
<?xml version="1.0" encoding="UTF-8" ?>
<routes xmlns="http://camel.apache.org/schema/spring" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <route id="receive-kafka-events">
        <from uri="direct:receive-events"></from>
        <log message="Received from kafka broker : ${body}"></log>
    </route>

    <route id="send-kafka-events">
        <from uri="servlet:send"></from>
        <log message="Sending"></log>
        <convertBodyTo type="java.lang.String"></convertBodyTo>
        <bean ref="smallrye-producer" method="produce(${body})"></bean>
        <setBody>
            <constant>Message sent</constant>
        </setBody>
    </route>
</routes>
{% endhighlight %}

<p>The first route is to consume messages coming from Kafka. We will wire later on a camel producer template to send
    messages to this route from any other class in application</p>
<p>The second route listens to http requests coming on the path <strong>http://0.0.0.0:8080/camel/send</strong>.
    It will use a bean that incorporates an emitter to send messages.
</p>

<h2>Configure sinks for Kafka topics</h2>

<h3>Application properties</h3>

<pre>
mp.messaging.incoming.in-events.connector=smallrye-kafka
mp.messaging.incoming.in-events.topic=events
mp.messaging.incoming.in-events.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

mp.messaging.outgoing.out-events.connector=smallrye-kafka
mp.messaging.outgoing.out-events.topic=events
mp.messaging.outgoing.out-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
</pre>

<p>We will configure a smallrye-kafka connector for incoming messages and outgoing messages</p>

<h3>Create a consumer</h3>

<p>Now lets go ahead and create a consumer with the @Incoming annotation. We will need to inject a Camel
    ProducerTemplate instance into the consumer so that the message can be forwarded to our Camel direct endpoint</p>

{% highlight java %}
package agilabs;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.camel.ProducerTemplate;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class Consumer {
    
    @Inject
    ProducerTemplate camelProducer;

    @Incoming("in-events")
    public void consume(String message) {              
        camelProducer.sendBody("direct:receive-events", message);
    }
}
{% endhighlight %}

<h3>Create a producer</h3>

<p>
    Now lets create a producer. Note that this producer will be used by the camel xml context. So it needs to support reflection to find the different methods dynamically.
    We need to register this class for reflection so that it can work once compiled to native code.
</p>

{% highlight java %}
package agilabs;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.apache.camel.ProducerTemplate;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.runtime.annotations.RegisterForReflection;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;

@ApplicationScoped
@RegisterForReflection
public class Producer {
    
    @Inject
    @Stream("out-events")
    Emitter<String> emitter;

    public void produce(String message) {
        emitter.send(message);       
    }
}
{% endhighlight %}

<h2>Wiring it all together</h2>

<p>Now that we have everything configured. We need to wire everything together. Register the SmallRye producer into the
    camel context and inject a camel producer template in the Quarkus CDI context.</p>
<p>As mentioned earlier, it seems that the Camel context is started earlier than the CDI context, so when Camel
    tries to look for a bean called smallrye-producer it will fail to start because it can't find it.
    This also comes from the fact that ApplicationScoped beans are instantiated in a lazy fashion. 
    Since the smallrye-producer is never called in the CDI context, it will never be initialized.
    For these reasons we need to observe the Camel InitializedEvent and register the SmallRye producer at that specific time.
</p>
<p>
    The @Produces annotation provides the Camel producer template to the CDI context.
</p>

{% highlight java %}
package agilabs;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.quarkus.camel.core.runtime.InitializedEvent;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.Stream;

public class CamelWiring{

    @Inject
    Producer smallRyeProducer;

    @Inject
    CamelContext camelContext;

    public void registerSmallryeEmitter( @Observes InitializedEvent ev ) throws Exception {
        camelContext.getRegistry().bind("smallrye-producer", smallRyeProducer);
    }

    @Produces
    public ProducerTemplate buildCamelProducerTemplate() throws Exception {
        return camelContext.createProducerTemplate(); ;
    }
}
{% endhighlight %}

<h2>Compiling and testing the project</h2>

<p>Now lets compile it to native code. The compilation is quite memory & CPU intensive. I guess this is due to the costly compilation of XML support libraries. 
    At peak I observed 9.8 GB of memory consumption. The whole compilation process took about 7 to 8 minutes</p>

<pre>
git clone https://github.com/alainpham/quarkus-kafka-camel-servlet.git
cd quarkus-kafka-camel-servlet
mvn package -Pnative
</pre>

<p>
    If you are in a rush. I have already built a container image for you. You can run everything wih only one command
    using the docker-compose.yml file. It will boot a zookeeper and a kafka broker using images provided by the Strimzi project and 
    also a container with the current project.
</p>

<pre>
docker-compose up
</pre>

<p>You can also get the container image directly here :
<br>
        <a href="https://hub.docker.com/r/alainpham/quarkus-kafka-camel-servlet">https://hub.docker.com/r/alainpham/quarkus-kafka-camel-servlet</a>
</p>

<p>Test it!</p>
<pre>
curl http://localhost:8080/camel/send  -X POST -H "Content-Type: text/plain"  -d 'Hello there!!'
</pre>

<p>The output should be</p>

<pre>
quarkus-kafka-camel-servlet_1  | 2019-07-05 13:12:58,025 INFO  [send-kafka-events] (executor-thread-1) Sending
quarkus-kafka-camel-servlet_1  | 2019-07-05 13:12:58,027 INFO  [io.sma.rea.mes.kaf.KafkaSink] (vert.x-eventloop-thread-1) Message org.eclipse.microprofile.reactive.messaging.Message$$Lambda$5e1799c60041209b57937765b33186671a72f6f8@7f35f7425e98 sent successfully to Kafka topic 'events'
quarkus-kafka-camel-servlet_1  | 2019-07-05 13:12:58,027 INFO  [receive-kafka-events] (vert.x-eventloop-thread-0) Received from kafka broker : Hello there!
</pre>

<p>Thanks for reading !</p>


<p><strong>Resources I used for inspiration</strong></p>
<ul>
    <li>
        <a href="https://quarkus.io/guides/kafka-guide">Quarkus Kafka Guide</a>
    </li>
    <li>
        <a href="https://quarkus.io/guides/cdi-reference">Quarkus - Contexts and Dependency Injection</a>
    </li>
    <li>
        <a href="https://github.com/gnodet/quarkus-camel">Quarkus - Camel example by Guillaume Nodet</a>
    </li>
    <li><a
            href="https://github.com/quarkusio/quarkus/blob/master/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/StreamEmitter.java">Smallrye
            Reactive Messaging Unit Test</a></li>
</ul>